package com.jie.flink.cdc.flinksink.config;

import com.jie.flink.cdc.doman.FlinkCdcSinkType;
import com.jie.flink.cdc.doman.FlinkCdcStreamProperties;
import com.jie.flink.cdc.util.ColumnUtil;
import com.jie.flink.cdc.util.JsonUtils;
import org.apache.flink.api.connector.sink2.Sink;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author zhanggj
 * @date 2023/5/29 9:39
 * @desc
 */
public class SinkConfigPropertiesFactory {
    public static List<Sink<String>> buildSinkList(final Map streamMap) {
        final List<Map> sinkConfigMapList = (List<Map>) streamMap.get(ColumnUtil.getFieldName(FlinkCdcStreamProperties :: getSinkConfigList));
        final List<SinkConfigProperties> sinkConfigList = new ArrayList<>();
        sinkConfigMapList.forEach(sinkConfigMap -> {
            final FlinkCdcSinkType sinkType = Enum.valueOf(FlinkCdcSinkType.class,
                    (String) sinkConfigMap.get(ColumnUtil.getFieldName(SinkConfigProperties :: getSinkType)));
            final SinkConfigProperties config = JsonUtils.mapToObject(sinkConfigMap, sinkType.getSinkConfigClass());

            sinkConfigList.add(config);
        });

        final List<Sink<String>> sinkList = new ArrayList<>();
        sinkConfigList.forEach(config -> {
            sinkList.add(SinkConfigProperties.sinkMap.get(config.getKey()).buildSink(config));
        });
        return sinkList;
    }
}
